---
title: "Part 11: Logging"
---

Starting with this tutorial, we will be adding more functionality to our proxy. In this tutorial we will focus on logging and will work on how to capture the informations related to incoming requests, response returned to client and other similar statistics which can help developers and/or auditors (if your company is required to keep track of all data)

## Problem

Since we are talking about logging, so we believe the more the information we can record the more the it would be better for debugging and/or statistics purposes. Informations like:
* Request
  * Header
  * Body
* Response
  * Header
  * Body
* Request Time
* Response Time
* Response receive completion time
* Remote host IP & port
* Local IP & port
* Extended Information

Previous articles already covers the information on how to retrieve this information, but in this tutorial we will be storing these informations in JSON format which would be much handy and useful to get processed by machines and/or similar logging/scrapping services.

## Logger plugin

We will extend the functionality by adding a new plugin (module) and this is the recommended practice for Pipy. In tutorial [Part 5: Plugins](/tutorial/05-plugins), we explained the usage of *use* filter and added the *response* sub-pipeline. If you are following along then you should have that remembered, but if you are new to this or haven't followed that tutorial, we highly recommended to go back to that tutorial first.

```js
.pipeline('request')
  .use(
    [
      'plugins/router.js',
      'plugins/default.js',
    ],
    'request',
    'response',
    () => __turnDown
  )
```

Execution sequence of each plugin is:

* request: A > B > C
* response: C > B > A

Let's add our plugin (logger.js) to our configuration. We will be adding code to our this plugin shortly:

```js
{
  "listen": 8000,
  "plugins": [
    "plugins/router.js",
    "plugins/logger.js",
    "plugins/balancer.js",
    "plugins/default.js"
  ]
}
```

Like with other plugins, our logger plugin will have its own configuration file. For this tutorial purpose, we will store the URL of host which will receive and store the log information. For testing purposes, we will mock this service but your use-case might vary, and you might already have logging stacks setup. So adjust this configuration specific to your usecase.

```js
{
  "logURL": "http://127.0.0.1:8123/log"
}
```

> The collected logs are sent in batches directly to storage service via the network, such as ElasticSearch and ClickHouse. If the network status is good enough, the database can avoid disk I/O and deployment of log collection tools such as FileBeat and Fluentd, which reduces the complexity of the architecture. Reporting directly over the network can also be buffered and reported in batches, reducing storage pressure.


Let's write the code of our logger plugin.

### Global Variables

We certainly do not want to report the collected request and response information separately (which makes it difficult to store and query the data), but we would like to aggregate the logging information, do some statistical operations. This approach will require us to record the request information before forwarding it to host for processing, combine this with response received from target host, so we need some mechanism to store such information. This is where *Global Variables* come into play, if you get scared after hearing *Global Variable* then that means you haven't yet read our previous tutorials and especially the *Context* section of [Concepts](/intro/concepts) where we explained that *Global Variables* are context specific and don't work like what you have learned in procedural style programming languages.

```js
pipy({
  _logURL: new URL(config.logURL),
  _request: null,
  _requestTime: 0,
  _responseTime: 0,
})

.export('logger', {
  __logInfo: {},
})
```

Here we also export (expose) module variables to make it easier to extend the log contents in other modules. With defined global variables, it is time to record data during requests and responses.

### logger.js

Let's first write the skeleton:

```js
//...
.pipeline('request')
.pipeline('response')
```

These are the two sub-pipelines used as plug-ins. Log recording is an auxiliary service and it shouldn't affect the core functionality of our proxy service, so in order for it to work transparently, it will be working on a copy of the data without interfering the core functionality of the proxy service.

## Filters

### fork filter

*fork* filter is one of the [Joint Filters](/tutorial/04-routing#joint-filters). It process the current context and message in a sub-pipeline, but instead of using the output of the sub-pipeline as its output, it takes its input as its output. It can be understood that only the context is affected, not the message of the current pipeline.

Usage of *fork* is very simple, we just need to provide the target sub-pipeline:

```js
//...
.pipeline('request')
  .fork('log-request')

.pipeline('response')
  .fork('log-response')

.pipeline('log-request')

.pipeline('log-response')
```

> It is important to note that the *fork* filter will not continue the original pipeline until the sub-pipeline process specified by the *fork* filter is complete. Therefore, avoid time-consuming operations in sub-pipelines to reduce the impact on request performance.

### handleMessage filter

First we will record the received request:

```js
.pipeline('log-request')
  .handleMessageStart(
    () => _requestTime = Date.now()
  )
  .decompressHTTP()
  .handleMessage(
    '256k',
    msg => _request = msg
  )
```

`Date.now()` function is used to retrieve current time. When a `MessageStart` event is received, we record the time of receipt of the request and use the `handleMessageStart` filter to process the event.

After receiving the complete message, log the complete request via the `handleMessage` filter:

* The first argument is the maximum size of the message body (when omitted, default is' -1 ', unlimited). To avoid excessive memory usage, set it to '256K'.
* The second argument is callback function and receives `Message` object as its argument.

### merge filter

Next, we need to record the response retrieved from target host：

```js
.pipeline('log-response')
  .handleMessageStart(
    () => _responseTime = Date.now()
  )
  .decompressHTTP()
  .replaceMessage(
    '256k',
    msg => (
      new Message(
        JSON.encode({
          req: {
            ..._request.head,
            body: _request.body.toString(),
          },
          res: {
            ...msg.head,
            body: msg.body.toString(),
          },
          reqTime: _requestTime,
          resTime: _responseTime,
          endTime: Date.now(),
          remoteAddr: __inbound.remoteAddress,
          remotePort: __inbound.remotePort,
          localAddr: __inbound.localAddress,
          localPort: __inbound.localPort,
          ...__logInfo,
        }).push('\n')
      )
    )
  )
  .merge('log-send', () => '')

.pipeline('log-send')
```

> We are using *spread operator* here to encapsure the object data.

in *log-response* sub-pipeline, we are recording the response details. At this point we have all the data we need to build the log, so we wrap the data into JSON-formatted object and create a new `Message` object.

Now we have the `Message` object which contains our required logging information, should we send it to our logger? If we start sending message by message to our logger system, it will impact the performance, as we will be performing so much IO overhead. Wouldn't it be much better if we can cache logs for multiple requests and report them in batches?

*merge* filter is used when connecting to the *log-send* channel.

*merge* filter is one of the [Joint Filters](/tutorial/04-routing#joint-filters) and its way of working is similar to that of [Filter: muxHTTP](/tutorial/03-proxy#filter-muxhttp) and *fork* filter.

Its similar to *muxHTTP* filter in a way, it will combine multiple streams into one and send to **shared** channel, and where it differs is that *merge* filter won't use the output of sub-pipeline as its output, similar to *fork* filter.

*merge* filter usage is similar to that of *muxHTTP*, where in addition to target sub-pipeline we also need to provide a key. For more details on why to use key, refer to [Part 4: Routing: Connection Sharing Problem](/tutorial/04-routing#connection-sharing-problem).

### pack filter

*log-send* sub-pipeline will receive messages from its parent pipeline and buffer them. Buffer has configuration options like buffer size, timeout settings. Buffer will get flushed to defined target (mock service in our case here) which ever condition is met first. 

```js
.pipeline('log-send')
  .pack(
    1000,
    {
      timeout: 5,
    }
  )
  .replaceMessageStart(
    () => new MessageStart({
      method: 'POST',
      path: _logURL.path,
      headers: {
        'Host': _logURL.host,
        'Content-Type': 'application/json',
      }
    })
  )
  .encodeHTTPRequest()
  .connect(
    () => _logURL.host,
    {
      bufferLimit: '8m',
    }
  )
```

## Test in action

Before starting our test, we need to instantiate our mock logger service. So let's use just Pipy to write a simple service, which will log to console what is being sent to it.

```js
// mock.js
pipy()

.listen(8123)
  .serveHTTP(
    msg => (
      console.log(`body: ${msg.body}`)
    )
  )
```
Invoke Pipy instance via `pipy mock.js` to start our mocking logger service, and also run our proxy service. Once our proxy service and mock service are running, open a terminal and perform a test:

```
curl http://localhost:8000/ip
You are requesting / from ::ffff:127.0.0.1
```

In our mock service window, you will see output like below:

```log
body: {"req":{"protocol":"HTTP/1.1","headers":{"host":"localhost:8000","user-agent":"curl/7.77.0","accept":"*/*"},"method":"GET","path":"/","body":""},"res":{"protocol":"HTTP/1.1","headers":{"connection":"keep-alive"},"status":200,"statusText":"OK","body":"You are requesting / from ::ffff:127.0.0.1\n"},"reqTime":1637938843619,"resTime":1637938843620,"endTime":1637938843620,"remoteAddr":"::1","remotePort":53624,"localAddr":"::1","localPort":8000}
```

## Summary

We have covered alot in this tutorial and we have learned and used new filters like *fork*, *merge*, *pack*

### Takeaways

* turn down sub-pipeline sequence of execution is the oppose of sequence of *use* filter importing plugins.
* *fork* filter: copies the event to sub-pipeline for execution and ignore the sub-pipeline output. *fork* filter output is the input it receives.
* *merge* filter is similar to *muxHTTP* filter in a way, it will combine multiple streams into one and send to **shared** channel, and where it differs is that *merge* filter won't use the output of sub-pipeline as its output, similar to *fork* filter.
* *pack* filter buffers the messages and provides supports for its configuration like buffer size, timeout details etc.


### What's next?

Security is the critical part and no one should ignore it. In next part we will try to add JWT authentication functionality.